package com.bootlever.mq.consumer.mq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.bootlever.framework.rocketmq.templates.RocketmqTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import java.util.List;

public class ConsumerListener implements ApplicationListener<ContextRefreshedEvent> {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerListener.class);

    @Autowired
    private RocketmqTemplate rocketmqTemplate;

    public void onApplicationEvent(ContextRefreshedEvent event) {
        logger.info("启动consumer");
        try {
            DefaultMQPushConsumer consumer = (DefaultMQPushConsumer) rocketmqTemplate.
                    getDefaultMQPushConsumer("test_consumer");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("topic1","*");
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                 ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg : msgs) {
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(), "utf-8");
                            String tags = msg.getTags();
                            logger.info("收到消息topic:{},tags :{},msg:{}",topic, tags, msgBody);
                        }
                    } catch(Exception e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            logger.info("consumer:{}启动成功", "test_consumer");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
